Search notes:

MapReduce: first example

A mixture of https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html#Example:_WordCount_v1.0 and https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch1/InvertedIndexJob.java
import java.io.IOException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.util.ArrayList;
import java.util.HashSet;

public class first {

  public static class Mapper_ extends Mapper <
     //
     // Types that are relevant for for the mapper:
        LongWritable,      // Type of key_in    (1st parameter of method map()
        Text,              // Type of value_in  (2nd parameter of method map()
        Text,              // Type of key_out   (1st parameter of methods Context.write()
        Text               // Type of value_out (2nd paramater of method Context.write()
      > {

    private Text value_out_filename;

    @Override
    protected void setup(
    // Setup is called once at the beginning of the task.
    //
    // The Context object allows the Mapper/Reducer to interact with the
    // rest of the Hadoop system.
    // It includes 
    //   o  configuration data for the job
    //   o  interfaces to emit output (see context.write in reduce() below )
    // It can be used to
    //   o  report progress
    //   o  set application-level status messages
    //   o  indicate a job is alive
    //   o  to get the values that are stored in job
    //      configuration across map/reduce phase
    //  See also http://stackoverflow.com/a/26955662/180275
       Context     ctx
    ) 
    {

    // InputSplit represents the data to be processed by an individual Mapper.
       InputSplit input_split = ctx.getInputSplit();

    // A FileSplit represents a section of an input file. The class extends InputSplit.
       FileSplit file_split = (FileSplit) input_split;

       long pos_1st_byte    = file_split.getStart();

    // The Path to the file split's data.
       Path file_path = file_split.getPath();

    // Turn Path into a String
       String file_name = file_path.getName();

    // The file name of the input document is the out-key.
    // value_out_filename is a member variable
       value_out_filename     = new Text(file_name);

    }

    @Override
    protected void map(
    //
    //  map is called once for each key/value pair in the input split.
    //
      LongWritable      key_in,
      Text              value_in,
      Context           ctx
    ) throws IOException, InterruptedException {

   // Generate a key/value pair for each word found in the input file.
   // The Word is the key, while the value is the file name.
      for (String word : StringUtils.split(value_in.toString())) {
       // Mapper.Context (of which class ctx is an instance) implements
       // the interface MapContext, which in turn extends TaskInputOutputContext.
       // So, see TaskInputOutputContext for the API-documentaion.

          ctx.write(new Text(word), value_out_filename);
      }

    }

  }

  public static class Reducer_ extends Reducer <

     //
     // Types that are relevant for for the mapper:
        Text    , // Type of 1st (key) parameter of reduce(), corresponds
                  // to type of mapper's out key.
        Text    , // Type of which the reduce() method receives an
                  // Iterable, corresponds to type of mappers out value.
        Text    , // Type of 1st parameter of context.write()
        Text      // Type of 2nd parameter of context.write()
    > {

    public void reduce(
    //
    //  reduce is called once for each (unique) map-output-key (which,
    //  in this example, is a word)
    //
        Text                  key_in_word,
        Iterable<Text>        values_in_file_names,
        Context               context
    ) throws IOException, InterruptedException {

      HashSet<String> file_names_unique = new HashSet<String>();

   // Iterate over each filename in which a word (key_in_word)
   // occurs and store in in a HashSet (file_names_unique), so
   // that we're able to retrieve the unique file names for
   // the word.
      for (Text file_name: values_in_file_names) {
        file_names_unique.add(file_name.toString());
      }

      String file_names_out = new String( StringUtils.join(file_names_unique, " - ") );

   // Write a key/value pair. In this case: key_in = key_out.
      context.write(key_in_word, new Text(file_names_out));
    }
  }

  public static void main(String[] args) throws Exception {

 // A Configuration provides access to configuration parameters. 
    Configuration conf = new Configuration();
 
 // The Job class is used to create, configure and run
 // instances of MapReduce applications.
 //
 // getInstance(...) creates a new Job with no particular Cluster
 // and a given job name.
    Job job = Job.getInstance(conf, "MapRed Introduction");

 // Specify the classes that will to the Map Reduce job:
    job.setJarByClass  (first    .class);
    job.setMapperClass (Mapper_  .class);
    job.setReducerClass(Reducer_ .class);

 // Specify the output key and value types for a job.
 // By default, setOutput{Key,Value}Class() sets the type
 // expected as output from both the map and reduce phases.
 // 
 // If the mapper emits different types than the reducer,
 // the mapper's types can be set with job.setMapOutput{Key,Value}Class().
    job.setOutputKeyClass  (Text.class);
    job.setOutputValueClass(Text.class);

 // Set Path to inputfiles for the map-reduce job:
    FileInputFormat.setInputPaths (job, new Path(args[0]));

 // Set the Path of the output directory for the map-reduce job:
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}
Github repository about-hadoop, path: /MapReduce/first/first.java
Running the whole thing
. scripts/load-files-into-HDFS
. scripts/run
Github repository about-hadoop, path: /MapReduce/first/run

Index