
Using Spark with Avro files
Avro is a very popular data serialization system that provides a compact and fast binary data format. Avro files are self-describing because the schema is stored along with the data.
You can download spark-avro connector JAR from https://mvnrepository.com/artifact/com.databricks/spark-avro_2.11/3.2.0.
Start Spark shell with the spark-avro JAR included in the session:
Aurobindos-MacBook-Pro-2:spark-2.1.0-bin-hadoop2.7 aurobindosarkar$ bin/spark-shell --jars /Users/aurobindosarkar/Downloads/spark-avro_2.11-3.2.0.jar
We will use the JSON file from the previous section containing the Amazon reviews data to create the Avro file. Create a DataFrame from the input JSON file and display the number of records:
scala> import com.databricks.spark.avro._
scala> val reviewsDF = spark.read.json("file:///Users/aurobindosarkar/Downloads/reviews_Electronics_5.json")
scala> reviewsDF.count()
res4: Long = 1689188
Next, we filter all the reviews with an overall rating of less than 3, coalesce the output to a single file, and write out the resulting DataFrame to an Avro file:
scala> reviewsDF.filter("overall < 3").coalesce(1).write.avro("file:///Users/aurobindosarkar/Downloads/amazon_reviews/avro")
Next, we show how to read an Avro file by creating a DataFrame from the Avro file created in the previous step and display the number of records in it:
scala> val reviewsAvroDF = spark.read.avro("file:///Users/aurobindosarkar/Downloads/amazon_reviews/avro/part-00000-c6b6b423-70d6-440f-acbe-0de65a6a7f2e.avro")
scala> reviewsAvroDF.count()
res5: Long = 190864
Next, we select a few columns and display five records from the results DataFrame by specifying show(5):
scala> reviewsAvroDF.select("asin", "helpful", "overall", "reviewTime", "reviewerID", "reviewerName").show(5)

Next, we specify compression options for Avro files by setting the Spark session configuration values:
scala> spark.conf.set("spark.sql.avro.compression.codec", "deflate")
scala> spark.conf.set("spark.sql.avro.deflate.level", "5")
Now, when we write the DataFrame, the Avro file is stored in a compressed format:
scala> val reviewsAvroDF = spark.read.avro("file:////Users/aurobindosarkar/Downloads/amazon_reviews/avro/part-00000-c6b6b423-70d6-440f-acbe-0de65a6a7f2e.avro")
You can also write out the DataFrame partitioned by a specific column. Here, we partition based on the overall column (containing values < 3 in each row):
scala> reviewsAvroDF.write.partitionBy("overall").avro("file:////Users/aurobindosarkar/Downloads/amazon_reviews/avro/partitioned")
The screenshot of the Avro files from this session are shown here. Notice the sizes of the compressed version (67 MB) versus the original file (97.4 MB) . Additionally, notice the two separate directories created for the partitioned (by overall values) Avro files.
