SHA1 with BASE64 Hadoop Hive UDF

This is a simple UDF for applying SHA1 + BASE64 on a string in Hive. Works like a charm in Hadoop Hive (tested with CDH 4.2.1)


package io.jackass.hadoop.hive.udf.crypto;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

import java.security.*;
import org.apache.commons.codec.binary.Base64;

public final class sha1 extends UDF {

	public Text evaluate(final Text s) {
	    if (s == null) {
                return null;
	    }
	    try {
	    	MessageDigest md = MessageDigest.getInstance("SHA1");
	    	md.update(s.toString().getBytes());
	    	byte[] hash = md.digest();
              Base64 encoder = new Base64();

		return new Text(encoder.encodeToString(hash));
	    } catch (NoSuchAlgorithmException nsae) {
	    	throw new IllegalArgumentException("SHA1 is not setup");
	    }
	}
}

It’s really simple to use it in Hive


    ADD JAR hive-crypto-udfs-1.0.jar;
    CREATE TEMPORARY FUNCTION sha1 as 'io.jackass.hadoop.hive.udf.crypto.sha1';
    select sha1('1111') from your_table;

if you need some help building JAR file, here is old school javac (tested with CDH 4.2.1)

  • place code above to subdirectory io/jackass/hadoop/hive/udf/crypto/sha1.java
  • run code below
  •     CP=$(find "/opt/cloudera/parcels/CDH/lib" -name '*.jar' -printf '%p:' | sed 's/:$//')
        javac -classpath $CP io/jackass/hadoop/hive/udf/crypto/sha1.java
        jar -cf hive-crypto-udfs-1.0.jar  -C . .
    

    Dashing with Oracle database

    I have been using Dashing http://dashing.io/ with great success. It is absolutely amazingly simple framework even for non-ruby developers. Here is a simple recipe how to create jobs pulling data from Oracle database.

     

    One Time Setup

    I am running centOS 6.5 64bit, bit in general this shoud work on any platform with few twists

     

    install Oracle thin client and install ruby-oci9

    • get 3 zips (or rpms or …) from Oracle web site http://www.oracle.com/technetwork/topics/linuxx86-64soft-092277.html
    • run the install, in my case there are 3 ZIPs
      
      cd ~
      mkdir oracle
      
      #wget files below manually (oracle requires login)
      unzip instantclient-basic-linux.x64-12.1.0.1.0.zip
      unzip instantclient-sdk-linux.x64-12.1.0.1.0.zip
      unzip instantclient-sqlplus-linux.x64-12.1.0.1.0.zip
      
      export ORACLE_HOME=~/oracle
      export LD_LIBRARY_PATH=$ORACLE_HOME/instantclient_12_1
      
      cd ~/oracle/instantclient12_1
      ln -s libclntsh.so.12.1 libclntsh.so
      
      gem install ruby-oci8
      
      
    • in case you need more help, check this http://rubydoc.info/gems/ruby-oci8/frames/file/README.md

     

    Let’s Play

     

    install dashing

    really don’t expect me to retype how to install it  ;-)

     

    edit gem file in your dashing project and add

    require 'ruby-oci8'

     

    setup username/password/oracle server

    For simplicity I will setup UID/PSW as shell variable

    export ORACLE_USER=myid
    export ORACLE_PASSWORD=mypassword
    export ORACLE_TNS=myoracleserver/myService
    

     

     

    Sample Job sending data to Rickshawgraph

    require 'oci8'
    
    points_field1 = []
    points_field2 = []
    last_x = 1
    elements_total = 100
    
    SCHEDULER.every '10s', :first_in => 0 do |job|
      begin
      conn =  OCI8.new(ENV['ORACLE_USER'], ENV['ORACLE_PASSWORD'], ENV['ORACLE_TNS'])
      
      conn.exec("
              select sum(field1)  field1,
                     sum(field2)  field2,
                     count(*)
                from mytable	  
    		  ") do |r|
        last_x += 1
    
    	
        points_field1  << { x: last_x, y: r[0].to_i }
        points_field2 << { x: last_x, y: r[1].to_i }
    	
    	series = [
    	          { name: "FIELD1",  data: points_field1.last(elements_total)  },
    	          { name: "FIELD2",  data: points_field2.last(elements_total)  }			  
    	         ]
    			 
        send_event('oracle-graph', series: series )
      end
      conn.logoff  
      
      rescue Exception => e
        puts e.message
      end
      
    end
    

     

     

    Sample Job sending data to List

     

    require 'oci8'
    
    SCHEDULER.every '10s', :first_in => 0 do |job|
      begin
      conn =  OCI8.new(ENV['ORACLE_USER'], ENV['ORACLE_PASSWORD'], ENV['ORACLE_TNS'])
    
      mylist = Hash.new  
      conn.exec("
                  select field1, count(*) from mytable group by field1
                ") do |r|    
                
        mylist[r[0]] = { label: r[0], value: r[1].to_i.to_s }
      end
      send_event('oracle-list', { items: mylist.values })
      
      conn.logoff  
    
      rescue Exception => e
        puts e.message
      end
      
    end
    
    
    

     

     

     

    query JSON in Hive

    Lets prep simple JSON first

     

    this is somewhat simple JSON, it includes one array

    {
      "myData": {
        "my_address_info": "UNV-ADDR",
        "transaction_number": "8007038190",
        "customer_id": "jiri2",
        "inventory": [
          {
            "product_category": "electronic",
            "amount": "5000.20"
          }
        ],
        "transaction_datetime": "06\/04\/2013 21:38:14"
      }
    }
    

    will look like this flattened

    { "myData":{ "my_address_info":"UNV-ADDR", "transaction_number":"8007038190", "customer_id":"jiri2", "inventory":[ { "product_category":"electronic", "amount":"5000.20" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    

    lets add more dummy records

    { "myData":{ "my_address_info":"UNV-ADDR", "transaction_number":"8007038190", "customer_id":"jiri2", "inventory":[ { "product_category":"electronic", "amount":"5000.20" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    { "myData":{ "my_address_info":"UNV-AAAA", "transaction_number":"8007038191", "customer_id":"jiri1", "inventory":[ { "product_category":"electronic", "amount":"5000.30" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    { "myData":{ "my_address_info":"UNV-BBBB", "transaction_number":"8007038192", "customer_id":"jiri3", "inventory":[ { "product_category":"electronic", "amount":"5000.40" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    { "myData":{ "my_address_info":"UNV-CCCC", "transaction_number":"8007038193", "customer_id":"jiri1", "inventory":[ { "product_category":"electronic", "amount":"5000.50" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    

    and save it to the Hadoop cluster (in my care kerberos enabled) as a file sample.json

    kinit yourname@COMPANY.COM;
    hadoop fs -put sample.json /projects/jiri/json_demo
    

     

    method 1: custom serde Hive-JSON-Serde

    prep the serde (one time setting)

    1. dowload the serde from https://github.com/rcongiu/Hive-JSON-Serde
    2. compile it
      mvn package
      
    3. save it on Hadoop client e.g. to /home/jiri/hive/json-serde-1.1.7.jar
    4. run hive CLI and add the serde
      kinit yourname@COMPANY.COM
      hive
      add JAR /home/jiri/hive/json-serde-1.1.7.jar
      

    note: you can add it to .hiverc file in your home directory if you dont want to add it manually every time you start hive

    create table

    I used external table, but regular table works the same

     

    CREATE EXTERNAL TABLE json_serde (
         myData struct <
                          my_address_info :string,
                          transaction_number :string,
                          customer_id :string, 
                          inventory :array<struct<product_category:string, amount:string>> ,
                          transaction_datetime :string
                       >
                                      )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    LOCATION "/projects/jiri/json_demo";
    

    optionally you can use this tool to generate the table DDL for you (this tool parses JSON and creates DDL based on the content) https://github.com/midpeter444/hive-json-schema

    let’s run some queries

    simple select statement

    select * from json_serde;
    
    {"my_address_info":"UNV-ADDR","transaction_number":"8007038190","customer_id":"jiri2","inventory":[{"product_category":"electronic","amount":"5000.20"}],"transaction_datetime":"06/04/2013 21:38:14"}
    {"my_address_info":"UNV-AAAA","transaction_number":"8007038191","customer_id":"jiri1","inventory":[{"product_category":"electronic","amount":"5000.30"}],"transaction_datetime":"06/04/2013 21:38:14"}
    {"my_address_info":"UNV-BBBB","transaction_number":"8007038192","customer_id":"jiri3","inventory":[{"product_category":"electronic","amount":"5000.40"}],"transaction_datetime":"06/04/2013 21:38:14"}
    {"my_address_info":"UNV-CCCC","transaction_number":"8007038193","customer_id":"jiri1","inventory":[{"product_category":"electronic","amount":"5000.50"}],"transaction_datetime":"06/04/2013 21:38:14"}
    

    simple group by query

      select myData.customer_id, count(*)
        from json_serde
    group by myData.customer_id;
    
    jiri1 2
    jiri2 1
    jiri3 1
    

    let’s select data from the array

      select myData.customer_id, count(*), sum(myData.inventory[0].amount)
        from json_serde
    group by myData.customer_id;
    
    jiri1 2 10000.8
    jiri2 1 5000.2
    jiri3 1 5000.4
    

    let’s do a join (in this case dummy to itself) and group by

      select a.myData.customer_id, count(*), sum(a.myData.inventory[0].amount)
        from json_serde a inner join json_serde b on (a.myData.customer_id = b.myData.customer_id)
    group by a.myData.customer_id;
    
    jiri1 4 20001.6
    jiri2 1 5000.2
    jiri3 1 5000.4
    

    interesting twist using explode

    field “inventory” in example above is an array and I always referenced first tuple directly instead of “give me all tuples from the array”. This can be done using “explode” function which basically creates multiple rows (one row per one tuple). This is same example as above, in this case first records has two values in the array.

    { "myData":{ "my_address_info":"UNV-ADDR", "transaction_number":"8007038190", "customer_id":"jiri2", "inventory":[ { "product_category":"electronic", "amount":"5000.20" } , { "product_category":"electronic", "amount":"50000.20" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    { "myData":{ "my_address_info":"UNV-AAAA", "transaction_number":"8007038191", "customer_id":"jiri1", "inventory":[ { "product_category":"electronic", "amount":"5000.30" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    { "myData":{ "my_address_info":"UNV-BBBB", "transaction_number":"8007038192", "customer_id":"jiri3", "inventory":[ { "product_category":"electronic", "amount":"5000.40" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    { "myData":{ "my_address_info":"UNV-CCCC", "transaction_number":"8007038193", "customer_id":"jiri1", "inventory":[ { "product_category":"electronic", "amount":"5000.50" } ], "transaction_datetime":"06/04/2013 21:38:14" } }
    

    I will use explode function on the inventory array to … well explode the record and in this case create 5 rows (first row will change into two rows),
    “lateral view” is almost like a virtual view created from the source (in this case JSON_SERDE) applied back, in this case it takes array, explodes it and injects the result to the result

    select myData.customer_id, inventory.product_category, inventory.amount
     from json_serde a
    lateral view explode( myData.inventory) b as inventory;
    
    jiri2 electronic 5000.20
    jiri2 electronic 50000.20
    jiri1 electronic 5000.30
    jiri3 electronic 5000.40
    jiri1 electronic 5000.50
    ..
    ..
    

     

    method 2: using built in get_json_object function

    as the name says, the main purposes is manipulation of JSON object, Probably the best use case is file with multiple fields where JSON is one of them, using it several times on pure JSON is a bit clunky and inefficient, lets look at it why.

    create table

    USE MY_SCHEMA;
    CREATE EXTERNAL TABLE json_table ( json string )
    ROW FORMAT DELIMITED
    STORED AS TextFile
    LOCATION "/projects/jiri/json_demo";
    

    run sample queries

    select get_json_object(json_table.json, '$')
     from json_table;
    set hive.cli.print.header=true;
    
    select get_json_object(json_table.json, '$.myData.my_address_info') as my_address_info,
           get_json_object(json_table.json, '$.myData.customer_id') as customer_id
      from json_table;
    ..
    

     

    method 3: using built in json_tuple function with LateralView

    json_tuple is perhaps a bit more advanced and faster approach, but still very clunky and hard to work with on deeper JSONs, each tupple can go only one level deep and requires lateralview on a lateralview

    create table

    USE MY_SCHEMA;
    CREATE EXTERNAL TABLE json_table ( json string )
    ROW FORMAT DELIMITED
    STORED AS TextFile
    LOCATION "/projects/jiri/json_demo";
    

    run sample queries

    first sample peals first level in JSON level

    set hive.cli.print.header=true;
    select b.*
    from json_table a
    lateral view json_tuple(a.json, 'myData') b as myData;
    
    myData
    {"my_address_info":"UNV-ADDR","transaction_number":"8007038190","customer_id":"jiri2","inventory":[{"product_category":"electronic","amount":"5000.20"}],"transaction_datetime":"06/04/2013 21:38:14"}
    {"my_address_info":"UNV-AAAA","transaction_number":"8007038191","customer_id":"jiri1","inventory":[{"product_category":"electronic","amount":"5000.30"}],"transaction_datetime":"06/04/2013 21:38:14"}
    {"my_address_info":"UNV-BBBB","transaction_number":"8007038192","customer_id":"jiri3","inventory":[{"product_category":"electronic","amount":"5000.40"}],"transaction_datetime":"06/04/2013 21:38:14"}
    {"my_address_info":"UNV-CCCC","transaction_number":"8007038193","customer_id":"jiri1","inventory":[{"product_category":"electronic","amount":"5000.50"}],"transaction_datetime":"06/04/2013 21:38:14"}
    

    second example goes two levels deep

    set hive.cli.print.header=true;
    
    select c.*
    from json_table a
    lateral view json_tuple(a.json, 'myData') b as myData
    lateral view json_tuple(b.myData, 'my_address_info', 'transaction_number', 'customer_id', 'inventory') c as address, trasref, mid, inventory;
    
    address trasref mid inventory
    UNV-ADDR 8007038190 jiri2 [{"product_category":"electronic","amount":"5000.20"}]
    UNV-AAAA 8007038191 jiri1 [{"product_category":"electronic","amount":"5000.30"}]
    UNV-BBBB 8007038192 jiri3 [{"product_category":"electronic","amount":"5000.40"}]
    UNV-CCCC 8007038193 jiri1 [{"product_category":"electronic","amount":"5000.50"}]
    ...
    

     

    lessons learned

  • hive-json-schema (tool I used to create DDL from JSON – this is one time step only) is buggy (it was able to get job reasonable well done and saved a lot of time, few tweaks had to be done to fix DDL)
  • Hadoop had no issue to create sample JSON (less than 10 minutes task for 13mm records JSON)
  • Hadoop has no issue query JSON using JSON-SERDE with reasobable time for simply sum as well as various joins
  • JSON serde deserializes escape characters. For example if your JSON included \n character, JSON serde will translate it (deserialize it) to NEWLINE automatically (there is currently no way to suppress this deserilization.
  • by default Hive using JSON-SERDE throws an error if JSON is mail formatted (this is a good thing), you can suppress the error by using setting_ ignore.malformed.json” = “true”_
    ALTER TABLE SAMPLE_TABLE SET SERDEPROPERTIES ( “ignore.malformed.json” = “true”);
  • Hive does not like “dot” and thus all JSON field which include “dot” as part of name have to be either renamed e.g. “score.reason_code” was renamed to “score_reason_code” or mapping hive property has to be set in Table DDL e.g. WITH SERDEPROPERTIES ( “mapping.my_reason_code” = “my.reason_code” )
  • If your JSON record spans several rows, you have to create your own custom InputSplit, you cannot use default TextInputSplit which uses NEWLINE. Tom White’s awesome “Hadoop: The Definitive Guide” book from O’Reilly goes into this in great depth.
  • Follow

    Get every new post delivered to your Inbox.