This is my last blog post of my introduction series for Oracle Big Data Cloud Service – Compute Edition. In this blog post, I’ll mention “Apache Pig”. It’s a tool/platform created by “Yahoo!” to analyze large data sets without the complexities of writing a traditional MapReduce program. It’s designed to process any kind of data (structured or unstructured) so it’s a great tool for ETL jobs. Pig comes installed and ready to use with “Oracle Big Data Cloud Service – Compute Edition”. In this blog post, I’ll show how we can write use pig to read, parse and analyze data.

Pig has a high-level SQL-like programming language called Pig Latin. We need to learn basics of this language to be able to use Pig. Each statement in a Pig script, is processed by the Pig interpreter to build a logical plan which will be used to procedure MapReduce jobs. The steps in the logical plan are not “executed” until a DUMP or STORE statement is used.

Pig scripts have generally the following structure:

  1. Data is read by using LOAD statements.
  2. Data is transformed/processed.
  3. The result is dumped (to screen) or stored to a file (or a Hive table).

In my previous post, I used PySpark (The Spark Python API) to load flight data and created very simple analyze using Spark SQL. I’ll do the same thing with same data using Pig.

Pig has an interactive shell named “Grunt”. Instead of writing Pig Latin statements to a file and execute it with pig command, we can also enter each statement to Grunt and interactively run them. To enter the Grunt shell, I’ll login to my first node, switch to hdfs user (to be able to reach the CSV files I used in my previous blog post) and then run the pig executable (pig) in the terminal:

[root@bilyonveri-bdcsce-1 ~]# sudo su - hdfs
[hdfs@bilyonveri-bdcsce-1 ~]$ pig
WARNING: Use "yarn jar" to launch YARN applications.
17/06/22 06:40:55 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
17/06/22 06:40:55 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
17/06/22 06:40:55 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2017-06-22 06:40:55,549 [main] INFO org.apache.pig.Main - Apache Pig version 0.15.0 (r: unknown) compiled Apr 20 2017, 18:50:24
2017-06-22 06:40:55,549 [main] INFO org.apache.pig.Main - Logging error messages to: /var/lib/hadoop-hdfs/pig_1498128055548.log
2017-06-22 06:40:55,566 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /var/lib/hadoop-hdfs/.pigbootup not found
2017-06-22 06:40:55,916 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine
- Connecting to hadoop file system at: hdfs://mycluster
2017-06-22 06:40:56,598 [main] INFO org.apache.pig.PigServer
- Pig Script ID for the session: PIG-default-43bfbd27-07c7-42ee-8070-8aa6af5e7646
2017-06-22 06:40:56,933 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl
- Timeline service address: http://bilyonveri-bdcsce-2.compute-trbilyonveri.oraclecloud.internal:8188/ws/v1/timeline/
2017-06-22 06:40:57,020 [main] INFO org.apache.pig.backend.hadoop.ATSService - Created ATS Hook
grunt> 2017-06-22 06:40:57,157 [ATS Logger 0] ERROR org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl
- Failed to get the response from the timeline server.
2017-06-22 06:40:57,157 [ATS Logger 0] INFO org.apache.pig.backend.hadoop.ATSService
- Failed to submit plan to ATS: Failed to get the response from the timeline server.
grunt>

Grunt can also be used to run hdfs commands. So I’ll run the following commands to see if I can access the CSV files (2008.csv, carriers.csv). These files are located in /user/zeppelin folder because, I used Zeppelin to download them.

grunt> fs -ls /user/zeppelin/*.csv
grunt> fs -tail /user/zeppelin/carriers.csv

When we run the above commands, I saw 2008.csv and carriers.csv, also the last lines of carriers.csv. So I’m sure that my pig script will also be able to access them.

So first of all, I need to read the data from these CSV files. The following statement will tell Pig to read data from carriers.csv and declare a data set named “carriers_file”;

carriers_file = LOAD '/user/zeppelin/carriers.csv';

We can tell LOAD statement that we will use a CSV file, the rows are not multi-line, used UNIX style end of line and skip the header. Don’t worry, it’s very simple and commonly used option, all we need to add some “engine” parameters.

carriers_file = LOAD '/user/zeppelin/carriers.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER');

We can also define our schema while loading data:

carriers_file = LOAD '/user/zeppelin/carriers.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER')
AS (Code:chararray, Description:chararray);

Be careful about case-sensitivity of variable names. In Pig scripts, variable names are case sensitive while Pig Latin keywords are not case sensitive. So if we want to use carriers_file in next statements, we need to use “carriers_file” (all lower case). I’ll define schema on load, because the 2008.csv file has lots of fields we do not use, therefore I don’t want to defined them).

carriers = FOREACH carriers_file GENERATE REPLACE($0, '"', '') as Code, REPLACE($1, '"', '') as Description;

If you do not define schema for a data set, you can access the fields using $0, $1 … variables. In our CSV files, fields are quoted “ZMZ”,”Winnipesaukee Aviation Inc.”, so I also need to remove quote signs using REPLACE command. To be able to do it, I need to say Pig to process each row in “carriers_file” data set. FOREACH and GENERATE statement is used for it. So above statement process each row, removes ‘”‘ quote signs from $0 and $1 (first and second columns) and name them as Code and Description.

As you will see, these commands will take no time to process, and they won’t trigger any real work (MapReduce job). If you want to see the content of a data set, you can use DUMP command. But first, don’t forget to filter the data. For example, I can use the following statements to get first 10 rows of “carriers” data set:

first_10_carriers = LIMIT carriers 10;
DUMP first_10_carriers

I will do the very similar thing to 2008.csv, but it will be a little bit comples because there are more columns, and I also defined the variable type for each column:

flights_file = LOAD '/user/zeppelin/2008.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER');
flights = FOREACH flights_file GENERATE
(chararray)$8 as Code:chararray,
(chararray)$22 as CancellationCode:chararray,
(int)REPLACE($24, 'NA', '0') as CarrierDelay:int,
(int)REPLACE($25, 'NA', '0') as WeatherDelay:int,
(int)REPLACE($26, 'NA', '0') as NASDelay:int,
(int)REPLACE($27, 'NA', '0') as SecurityDelay:int,
(int)REPLACE($28, 'NA', '0') as LateAircraftDelay:int;

In my last query I will only use “CarrierDelay” but I wanted to show how we can get the other columns. This time I didn’t removed quote signs, instead of replace NA with 0. You might wonder why we put “(int)” before the REPLACE commands, like this:

(int)REPLACE($24, 'NA', '0') as CarrierDelay:int,

I tell Pig that “the REPLACE command will process 25th field in “flights_file” and return an integer, and I want to store it as CarrierDelay (an integer column)”. I define the data types strictly to prevent casting problems on MapReduce jobs. This is very important tip, specially if you use an earlier release than Pig 0.17.

Now we need to join these two data sets. So we will use JOIN command:

flights_join = JOIN flights BY Code, carriers BY Code;

Then we group the joined data by the Code and the Description columns:

flight_grouped = GROUP flights_join by (flights::Code, carriers::Description);

In my previous blog post, I found the total of the delayed flights, the total flights, average delay time (based on CarrierDelay), the ratio of Delayed/Total flights for each airline/carrier and sorted them. That was very easy when you do it with SQL, Pig requires more thinking to build such a query:

result = FOREACH flight_grouped {
Total = COUNT(flights_join);
delayed_flights = FILTER flights_join BY CarrierDelay > 0;
avg_CarrierDelay = AVG(flights_join.CarrierDelay);
Delayed = COUNT(delayed_flights);
DelayedTotalRatio = (double)(double)(Delayed / (double)Total);
GENERATE FLATTEN(group) AS (Code, Description), avg_CarrierDelay as avg_CarrierDelay,
Delayed as Delayed, Total as Total, DelayedTotalRatio as DelayedTotalRatio;
}

ordered_result = ORDER result by DelayedTotalRatio DESC;
DUMP ordered_result

Let me try to explain the above code (item numbers match line numbers of the code block):

Line 1) I define “result” as the result data set. Pig should process each row in “flight_grouped” data set.

Line 2) The number of flights for each airline will be counted and stored as Total.

Line 3) A new data set for delayed flights will be created.

Line 4) Average delay will be calculated and stored as avg_CarrierDelay.

Line 5) The number of delayed flights will be counted and stored as Delayed.

Line 6) The delayed/total flight ratio is calculated and stored as DelayedTotalRatio.

Line 7) The row containing (Airline) Code, Description, average delay time, number of delayed flights, total number of flights and the ratio is returned as a row (for the result set).

Line 10) A new data set (ordered_result) is generated by ordering descending the result data set by DelayedTotalRatio column.

Line 12) The dump is used to generate the result (to the screen). It will trigger the MapReduce job produced by Pig.

This is the whole script (that you can copy and paste into grunt):

carriers_file = LOAD '/user/zeppelin/carriers.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER');
carriers = FOREACH carriers_file GENERATE REPLACE($0, '"', '') as Code, REPLACE($1, '"', '') as Description;
flights_file = LOAD '/user/zeppelin/2008.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER');

flights = FOREACH flights_file GENERATE
(chararray)$8 as Code:chararray,
(chararray)$22 as CancellationCode:chararray,
(int)REPLACE($24, 'NA', '0') as CarrierDelay:int,
(int)REPLACE($25, 'NA', '0') as WeatherDelay:int,
(int)REPLACE($26, 'NA', '0') as NASDelay:int,
(int)REPLACE($27, 'NA', '0') as SecurityDelay:int,
(int)REPLACE($28, 'NA', '0') as LateAircraftDelay:int;

flights_join = JOIN flights BY Code, carriers BY Code;
flight_grouped = GROUP flights_join by (flights::Code, carriers::Description);

result = FOREACH flight_grouped {
Total = COUNT(flights_join);
delayed_flights = FILTER flights_join BY CarrierDelay > 0;
avg_CarrierDelay = AVG(flights_join.CarrierDelay);
Delayed = COUNT(delayed_flights);
DelayedTotalRatio = (double)(double)(Delayed / (double)Total);
GENERATE FLATTEN(group) AS (Code, Description), avg_CarrierDelay as avg_CarrierDelay,
Delayed as Delayed, Total as Total, DelayedTotalRatio as DelayedTotalRatio;
}

ordered_result = ORDER result by DelayedTotalRatio DESC;
DUMP ordered_result

The followings are the statistics of the produced MapReduce job when you run the last statement (DUMP) of the script:

2017-06-22 04:54:01,470 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.7.1.2.4.2.0-258 0.15.0 hdfs 2017-06-22 04:51:52 2017-06-22 04:54:01 HASH_JOIN,GROUP_BY

Success!

Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_1497859064811_0022 7 1 18 3 13 15 37 37 37 37 carriers,carriers_file,flights,flights_file,flights_joiHASH_JOIN
job_1497859064811_0023 3 1 12 9 10 10 36 36 36 36 delayed_flights,flight_grouped,result GROUP_BY hdfs://mycluster/tmp/temp1852761250/tmp-1091973150,
Input(s):
Successfully read 7009728 records from: "/user/zeppelin/2008.csv"
Successfully read 1491 records from: "/user/zeppelin/carriers.csv"
Output(s):
Successfully stored 20 records (1271 bytes) in: "hdfs://mycluster/tmp/temp1852761250/tmp-1091973150"
Counters:
Total records written : 20
Total bytes written : 1271
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 2
Total records proactively spilled: 1005991
Job DAG:
job_1497859064811_0022 -> job_1497859064811_0023,
job_1497859064811_0023

Whole process took less than 3 minutes, and here’s the result:

(EV,Atlantic Southeast Airlines,6.01967031987882,37398,280575,0.13329056402031542)
(YV,Mesa Airlines Inc.,7.561840505236732,30855,254930,0.12103322480680971)
(AA,American Airlines Inc.,4.6651958636765665,72843,604885,0.12042454350827017)
(F9,Frontier Airlines Inc.,2.620778596938243,11264,95762,0.11762494517658362)
(B6,JetBlue Airways,3.455645593117481,21451,196091,0.10939308790306541)
(OH,Comair Inc.,5.44396706594402,21308,197607,0.10783018820183496)
(NW,Northwest Airlines Inc.,4.252088295191744,36800,347652,0.10585297941619781)
(UA,United Air Lines Inc.,3.826679866077884,46847,449515,0.10421676695994572)
(WN,Southwest Airlines Co.,1.8814183268788787,117195,1201754,0.09751995832757786)
(CO,Continental Air Lines Inc.,3.3377259553366505,28618,298455,0.09588715216699335)
(XE,Expressjet Airlines Inc.,3.252273637553069,34914,374510,0.09322581506501829)
(DL,Delta Air Lines Inc.,3.1226448285247086,41296,451931,0.0913767809687762)
(MQ,American Eagle Airlines Inc.,3.475229929915446,44592,490693,0.0908755576297196)
(US,US Airways Inc. (Merged with America West 9/05. Reporting for both starting 10/07.),2.4456832066033347,38134,453589,0.0840717036788811)
(AS,Alaska Airlines Inc.,3.1887930007544574,12374,151102,0.08189170229381477)
(HA,Hawaiian Airlines Inc.,2.9899233332255037,4718,61826,0.07631093714618445)
(OO,Skywest Airlines Inc.,2.9589868096953413,39663,567159,0.06993277017555924)
(9E,Pinnacle Airlines Inc.,3.464840889675372,16176,262208,0.06169148157188187)
(FL,AirTran Airways Corporation,1.6955870439155623,13943,261684,0.05328182082206019)
(AQ,Aloha Airlines Inc.,1.1188461538461538,233,7800,0.029871794871794873)

Pig Latin is a very powerful language to process data but it requires a different mindset than SQL. Hope this blog post will help you, at least give you an idea about Pig.

 

About the Author

Gokhan Atil

Gokhan Atil has over 15 years of experience in the IT industry and a strong background in database management (Oracle 8i,9i,10g,11g), software development and UNIX systems. He is an Oracle certified professional for EBS R12, Oracle 10g and 11g. Gokhan specializes in high availability solutions, performance tuning and monitoring tools. Gokhan is a founding member and current vice president of Turkish Oracle User Group (TROUG). He’s also a member of Independent Oracle User Group (IOUG). Gokhan presented at various conferences, and he is one of co-authors of “Expert Oracle Enterprise Manager 12c” book.

Start the discussion at forums.toadworld.com