Avoid These Mistakes While Writing Apache Spark Program

Docker’s native support to Kubernetes for a containerization revolution
October 18, 2017
Deep Dive into Docker Logging
November 24, 2017
Show all

Avoid These Mistakes While Writing Apache Spark Program

The importance of writing a clean code is immense. It helps in fixing bugs and optimizing the output. It is always said that “prevention is better than cure” and the same goes for programming as well. Avoiding mistakes is better than making mistakes. So let us take a quick look at some common mistakes that should be avoided while writing an Apache Spark Program or Spark applications.

Number 1: Deciding the number of executors, cores, and memory

 

There isn’t much confusion when it comes to deciding the number of executors, cores, and memory. Especially beginners would follow the common approach. But being confused actually helps here. Lets us know why? The most common mistake while making a decision (in this case) is by thinking that the most granular or most dense would have the smallest sized executors! But this is wrong because the benefit of running multiple tasks from the same executor is being taken away. In short, this becomes a case of improper utilization of available resources.

The second way of making a decision is by thinking about allowing all the memory and cores in the cluster to the least granular or least dense executor. But this is also wrong because there will be no overhead left for OS or Hadoop daemons.

Dynamic allocation is one good solution which allows Spark to dynamically cluster resources. Dynamic allocation refers to getting as much as needed and nothing more. Based on the workload, it scales the number of executors up and down. This means that the idle executors are simply removed especially when there are pending tasks waiting for executors. But this would only work with Spark on Yarn. This definitely helps you to find out the number of executors needed. For enabling dynamic allocation, one has to simply set  spark.dynamicAllocation.enabled to true in the Spark config.

Number 2: Avoid Large Shuffle Block Size

 

There can be application failures and one strange reason for it is related to Spark shuffle! In terms of MapReduce, a Spark shuffle is a file written from one Mapper for a Reducer. A Spark shuffle block can’t be more than 2 GB. The Reducer is responsible for making a local copy which is further reduced. There will be an overflow exception if the shuffle block size exceeds more than 2 GB. The logic behind this exception is that the Spark uses ByteBuffer (as a type of abstraction) for blocks Spark SQL with the Default number of partitions when shuffles are 200. To solve this problem, one has to simply reduce the average partition size. One can use coalesce() for reducing the partition size. It helps in running operations efficiently once a large dataset is filtered.

Number 3: Use of groupByKey for associative operations

 

Using reduceByKey and groupByKey is actually same in many cases. Both these are used for the same purpose but reduceByKey should be preferred, as it works better on a larger dataset, reduceByKey can combine output with a common key on each partition before it shuffles the data. This greatly reduces the amount of data which needs to be shuffled across the the network.

Number 4: Never neglect serialization

 

 When it comes to distributed application, serialization plays an important role. There will be some formats which will always be slow to serialize objects or would be consuming a large number of bytes. This affects the computation and will slow down the process. A Spark application should always be tuned up to serialization for achieving good results. To tune up to serialization, serializers like Kryo should be used. The below-provided example shows a way to turn on Kryo and register the classes which are subject to serialization.

val conf = new SparkConf()
          .setAppName(...)
          .setMaster(...)
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .set("spark.kryoserializer.buffer.max", "128m")
          .set("spark.kryoserializer.buffer", "64m")
          .registerKryoClasses(Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]]))

Number 5: Avoiding the flatMap-join-groupBy pattern

 

Joining two datasets which are already grouped by key can be done using cogroup rather than using flatMap-join-groupBy pattern. This helps in avoiding the overhead which is associated with unpacking and repacking of groups.

Wrapping up

 

Avoiding the above issues will always help you build a better, reliable and an efficient application using Apache Spark.

 

About the Author

Tao is a passionate software engineer who works in a leading big data analysis company in Silicon Valley. Previously Tao has worked in big IT companies such as IBM and Cisco.

He is the author of several best-selling courses on Udemy with more than 39,000 students.

Check out all the DevOps and Big Data courses Tao teaches.

Get all Tao’s courses at 90% Discounts!

Want to get the latest update in the DevOps and Big Data world?

Tao
Tao
Tao is a passionate software engineer who works in a leading big data analysis company in Silicon Valley. Previously Tao has worked in big IT companies such as IBM and Cisco. Tao has a MS degree in Computer Science from University of McGill and many years of experience as a teaching assistant for various computer science classes.

Leave a Reply

Your email address will not be published.

LEARN HOW TO GET STARTED WITH DEVOPS

get free access to this free guide, downloaded over 200,00 times !

You have Successfully Subscribed!

Level Up Big Data Pdf Book

LEARN HOW TO GET STARTED WITH BIG DATA

get free access to this free guide, downloaded over 200,00 times !

You have Successfully Subscribed!

Jenkins Level Up

Get started with Jenkins!!!

get free access to this free guide, downloaded over 200,00 times !

You have Successfully Subscribed!