17 December 2013 20 Comments

Connecting Tableau to ElasticSearch (READ: How to query ElasticSearch with Hive SQL and Hadoop)

I've been a big fan of ElasticSearch the since last Spring - using it on my RiffBank project as well as various other "data collection" experiments. Put (very) simply, it's a badass search server based on Apache Lucene. But in all honestly, to me, it's really a very scalable, auto-balancing, redundant, NoSQL data-store with all the benefits of a full search and analytics server.

#helladistributed

Also... it's fast. Really. Fucking. Fast.

Generally speaking, that is (can you use "generally speaking" after dropping an f-bomb?).

gggku-xlarge-(4)
(yes, I said "Hadoop" - don't be afraid, little Ricky)

My current love affair (romantic fling?) with ES not-withstanding - it generally isn't usually used as a general purpose data-store (even though it's more than capable), but with the impending release of v1.0 I can see this changing and it's role expanding. At the very least why would you bother to using something like MongoDB (or a similar "hot" NoSQL sink) when ES is all that and more (plus a joy to work with and scale out).

But therein lies a problem - if I'm storing a shit-ton of data in ES, I certainly can't use my go-to visual analytics tool Tableau on it, since ES querying is strictly RESTful... or can I?

Enter ES's Hadoop plug-in. While primarily created to get Hadoop data INTO ES (assumably) we also use it to create an external "table" (more like 'data structure template') in Hive pointing to an ES index and SQL our little goat hearts out on it (and use a pretty generic Hive driver in Tableau to connect to it.).

So: Tableau -> Hive SQL -> Hadoop -> Map/Reduce -> ES

So, yeah, there is some serious overhead and translation / abstraction involved (obviously) and queries will be much slower than native - but the only other (direct) alternative is... well, there ISN'T ONE. You'd have to build custom ETL to load data from ES to another DB and query that directly (or query the initial source, if possible).

Maybe some day Tableau will allow us to write data-source access plug-ins...

Granted, if you already run a hadoop cluster you should be able to leverage it for better scaling MapR jobs. But, I'm going to assume that you don't use hadoop at all and we will set up an instance JUST for making ElasticSearch queries via Hive SQL.

Note: This step-by-step was created on a DigitalOcean 32-bit VM using a fresh Ubuntu 12.04 LTS install (64-bit would work fine as well albeit with small library differences - I was using small instances, hence the 32-bit).

First things first - this assumes that your ES cluster is already running on the network in an area accessable by the box we will be configuring. In my case it's a group of similar Ubuntu 12.04LTS DigitalOcean VMs as shown below. I'm not going to cover setting up ES, but trust me - it's dead easy and very felxible.

For my sample dataset I'm using a Wikipedia English Page dump (imported via the ES wikipedia-river plugin).

es-head-shot
(showing my test ES cluster* / index setup - using the excellent "head" plugin - great for watching shards re-balance and overall a great front-tend tool for most 'Elastic' needs)

*Lovecraftian node names optional

"They had come from the stars, and had brought Their images with Them..."

ku-xlarge4

Let's get started...

Ok, first log in to a fresh 12.04 box... let's create a 'dumbo' user and get him/her all configured correctly...

I'm going to assume that you are logged in as root.

addgroup hadoop
adduser --ingroup hadoop dumbo

Now to make sure that we can invoke SUDO as dumbo.

visudo

#add this at the bottom and save
dumbo ALL=(ALL) ALL

Many posts on setting up Hadoop recommend disabling ipv6, so we will just take care of that in order to eliminate any possible cluster 'lookup' issues later.

nano /etc/sysctl.conf

#add this
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
sysctl -p

Ok, enough root. Now feel free to disconnect and re-login as our 'dumbo' user (or SU to it, or reboot, whatever you like).

Let's set some environment settings for later (and fun, laziness)

nano ~/.bashrc

###### put at end of.bashrc#########
PS1='${debian_chroot:+($debian_chroot)}\[\033[01;32m\]\u@\h\[\033[00m\]:\[\033[01;34m\]\w\[\033[00m\]\$ '

alias free="free -m"
alias update="sudo aptitude update"
alias install="sudo aptitude install"
alias upgrade="sudo aptitude safe-upgrade"
alias remove="sudo aptitude remove"

# Set Hadoop/Hive-related environment variables
export HADOOP_HOME=/home/dumbo/hadoop
export HIVE_HOME=/home/dumbo/hive
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME


# Set JAVA_HOME (we will also configure JAVA_HOME directly for Hadoop later on)
export JAVA_HOME=/usr/lib/jvm/java-7-oracle

# Some convenient aliases and functions for running Hadoop-related commands
unalias fs &> /dev/null
alias fs="hadoop fs"
unalias hls &> /dev/null
alias hls="fs -ls"

lzohead () {
    hadoop fs -cat $1 | lzop -dc | head -1000 | less
}

# Add Hadoop bin/ directory to PATH
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin
######end .bashrc#########

Let's re-up our .bashrc to reflect the changes

source ~/.bashrc

Ok, let's add some packages and update our install (using some of the aliases we just created)

update
install htop zip python build-essential python-dev python-setuptools locate python-software-properties lzop
sudo add-apt-repository ppa:webupd8team/java
update
install oracle-java7-installer

Ok, now we can start installing shit without fear! You scared? Don't be scared. Be brave!

First up, Hadoop.

cd ~
wget http://mirror.metrocast.net/apache/hadoop/core/hadoop-2.2.0/hadoop-2.2.0.tar.gz
tar -zxvf ./hadoop-2.2.0.tar.gz
mv ~/hadoop-2.2.0 ~/hadoop

Ok, now we have to edit a bunch of config files. Bear with me here...

nano /home/dumbo/hadoop/etc/hadoop/hadoop-env.sh

#add or modify
export JAVA_HOME=/usr/lib/jvm/java-7-oracle
export HADOOP_CONF_DIR=/home/dumbo/hadoop/etc/hadoop
nano /home/dumbo/hadoop/etc/hadoop/core-site.xml
<configuration>
       <property>
              <name>fs.default.name</name>
              <value>hdfs://localhost:9000</value>
        </property>
</configuration>
nano /home/dumbo/hadoop/etc/hadoop/hdfs-site.xml
<configuration>
    <property>
              <name>dfs.replication</name>
              <value>1</value>
          </property>
       <property>
              <name>dfs.name.dir</name>
              <value>/home/dumbo/dfs/name</value>
              <final>true</final>
          </property>
       <property>
              <name>dfs.data.dir</name>
              <value>/home/dumbo/dfs/data</value>
              <final>true</final>
          </property>
       <property>
              <name>dfs.tmp.dir</name>
              <value>/home/dumbo/dfs/tmp</value>
              <final>true</final>
       </property>
</configuration>
mv /home/dumbo/hadoop/etc/hadoop/mapred-site.xml.template /home/dumbo/hadoop/etc/hadoop/mapred-site.xml
nano /home/dumbo/hadoop/etc/hadoop/mapred-site.xml
<configuration>
       <property>
              <name>mapred.job.tracker</name>
              <value>localhost:9001</value>
      </property>
      <property>
              <name>mapred.system.dir</name>
              <value>/home/dumbo/mapred/system</value>
              <final>true</final>
      </property>
 </configuration>
nano /home/dumbo/hadoop/etc/hadoop/yarn-site.xml
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>

Ok, now we need to make sure that Hadoop can spawn new 'dumbo' shell sessions via ssh without a password (it's how their scripts operate)....

ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

We need to make sure that 127.0.0.1 doesn't map to your literal hostname in /etc/hosts but only localhost (can cause loopback connection issues)

sudo nano /etc/hosts

#for example
xxx.xxx.xx.1xx  innsmouth #my ip / my hostname
127.0.0.1       localhost

Time to "format" our hadoop HDFS 'filesystem'...

~/hadoop/bin/hadoop namenode -format

Let's start it all up and hope for the best...

~/hadoop/sbin/hadoop-daemon.sh start namenode
~/hadoop/sbin/hadoop-daemon.sh start datanode
~/hadoop/sbin/yarn-daemon.sh start resourcemanager
~/hadoop/sbin/yarn-daemon.sh start nodemanager
~/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver

If all is good, you shouldn't get any errors and see all 5+ procs running if you run a 'jps' command

jps
1239 JobHistoryServer
1493 Jps
993 DataNode
934 NameNode
1167 NodeManager
1106 ResourceManager
...etc

Let's move on to Hive

Don't worry, way less to do here.

cd ~
wget http://apache.osuosl.org/hive/hive-0.12.0/hive-0.12.0-bin.tar.gz
tar -zxvf hive-0.12.0-bin.tar.gz
mv ~/hive-0.12.0-bin ~/hive

Only one config file to set up / create (to load the lib that we will get in the next step)

nano ~/hive/conf/hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
        <property>
             <name>hive.aux.jars.path</name>
             <value>/aux_lib/</value>
        </property>
</configuration>

Create some dirs in HDFS for Hive...

hadoop fs -mkdir       /tmp
hadoop fs -mkdir       /user/hive/warehouse
hadoop fs -chmod g+w   /tmp
hadoop fs -chmod g+w   /user/hive/warehouse

You should now be able to run 'hive' and not see any errors launching.. (then just type 'quit;' to exit)

The ES "plug-in" part

cd ~
wget https://download.elasticsearch.org/hadoop/hadoop-latest.zip
unzip ./hadoop-latest.zip

mkdir /home/dumbo/hive/aux_lib/ # creating a folder for our new jar
cp ~/elasticsearch-hadoop/dist/elasticsearch-hadoop-1.3.0.M1-yarn.jar /home/dumbo/hive/aux_lib/

Bit of a curveball here: Time to copy the lib to a HDFS folder so remote connections can find it as well as local.

hadoop fs -mkdir /aux_lib
hadoop fs -copyFromLocal /home/dumbo/hive/aux_lib/* hdfs:///aux_lib
hadoop fs -ls /aux_lib # to check that the jar made it

Booya. We should be all configured to make the magic happen now. Let's start up an interactive HIVE SQL shell and check it out.

hive --auxpath /home/dumbo/hive/aux_lib/ --config /home/dumbo/hive/conf/
13/12/17 04:52:01 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
13/12/17 04:52:01 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
13/12/17 04:52:01 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
13/12/17 04:52:01 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
13/12/17 04:52:01 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
13/12/17 04:52:01 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
13/12/17 04:52:01 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative

Logging initialized using configuration in jar:file:/home/dumbo/hive/lib/hive-common-0.12.0.jar!/hive-log4j.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/dumbo/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/dumbo/hive/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
hive>

Fuckin' A right! If you see this above. You are a master of all your survey - or something like that. Anyways, with the ES plugin - we are creating an external table def that maps directly to our ES index / ES query.

# typically you'd need to run this first
ADD JAR /home/dumbo/hive/aux_lib/elasticsearch-hadoop-1.3.0.M1-yarn.jar;
# but it should automatically be loaded via our config (here for ref)


# anyways
CREATE EXTERNAL TABLE wikitable (
    title string,
    redirect_page string )
STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'wikipedia_river/page/_search?q=*',
              'es.host' = 'localhost',
              'es.port' = '9200');

Notice the ES vars, In this case I'm running a simple ES client on the Hadoop server (no data, no master) and connecting to that - but it could be anywhere on your network, shouldn't matter.

You should see a response like this.

OK
Time taken: 6.11 seconds

Let's give it some work to do.

select count(distinct title) from wikitable;

...and the MapReduce train gets rolling'.

Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Starting Job = job_1387231099006_0004, Tracking URL = http://innsmouth:8088/proxy/application_1387231099006_0004/
Kill Command = /home/dumbo/hadoop/bin/hadoop job  -kill job_1387231099006_0004
Hadoop job information for Stage-1: number of mappers: 5; number of reducers: 1
2013-12-17 05:03:33,378 Stage-1 map = 0%,  reduce = 0%
2013-12-17 05:04:16,198 Stage-1 map = 1%,  reduce = 0%, Cumulative CPU 47.58 sec
2013-12-17 05:04:17,231 Stage-1 map = 1%,  reduce = 0%, Cumulative CPU 47.58 sec
2013-12-17 05:04:18,274 Stage-1 map = 1%,  reduce = 0%, Cumulative CPU 49.59 sec
2013-12-17 05:04:19,354 Stage-1 map = 1%,  reduce = 0%, Cumulative CPU 52.76 sec
2013-12-17 05:04:20,398 Stage-1 map = 1%,  reduce = 0%, Cumulative CPU 52.76 sec
...

It's a bit of an odd (and slow) example (esp on my small VM set up / example data), since in pure ES you'd just run a faceted open query on title - but it shows that we can talk to ES using Hive SQL. Also the plugin is getting better all the time and should optimize the query better in the future.

Now we have a Hive "wikitable" object that can be treated just like any other hive table (via Tableau or other). However, did you notice the ElasticSearch URL query string during table creation (wikipedia_river/page/_search?q=*)?

This essentially allows us to run a query / filter / etc in native ES BEFORE it gets abstracted and interpreted by Hive. For Example...

CREATE EXTERNAL TABLE wikitallica (  title string, redirect_page string )
STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'wikipedia_river/page/_search?q=metallica',
              'es.host' = 'localhost',
              'es.port' = '9200');

See q=title:metallica? It's going to do a regular full-text search on all fields for 'metallica' before passing the result set to Hive SQL.

Powerful leverage indeed... now if only we could pass parameters from query to table def...

Tableau time

First, make sure that Hive is launched as an external service (defaults to port 10000)

hive --auxpath /home/dumbo/hive/aux_lib/ --config /home/dumbo/hive/conf/ --service hiveserver

Easy. Install the MapR Hive ODBC Connector and fire up Tableau.

And you are "off to the races" as they say...

tabdriv

Errors? Issues? Let me know in the comments.

As with any EPIC nuts n' bolts type How-To post, there are bound to be errors and typos in v1 - and there are a lot of moving parts in this one...

[ Drawings part of Kenn Mortensen's amazing Post-It Monstres collection ]

  • Dustin Smith

    Every time this phrase “…or can I?” is in one of your posts, I know I probably need to clear my schedule for the rest of the day. The number of times I can use the excuse “the reason this isn’t done is because I was on ryrobes.com again” is wearing thin.

    Seriously, fantastic stuff. NoSQL analytics discussion is one I’m currently watching really closely.

    P.S. Lovecraft references are clearly necessary for any serious technology learning.

    • http://ryrobes.com/ Ryan Robitaille

      Thanks Dustin. Yeah – it’s an interesting space indeed. This may not be the fastest or most elegant solution… but Slow Access > No Access!

  • Costin Leau

    # disclaimer – I’m a committer on Elasticsearch Hadoop project and employee at Elasticsearch

    Hi,

    Thanks for sharing. Regarding aggregation operations (such as count) in Hive and Pig – we are working on it; there aren’t many hooks out there for us to push this operation down to the loader and without them, the data needs to be ‘streamed’ to Hadoop in order to compute the operation.
    Elasticsearch Hadoop upcoming 1.3 M2 release already has several improvements behind the scenes (such as projection) and we intend to support ‘push down filtering’ as well.

    Cheers!

    P.S. there’s no need to specify es.host and es.port as the defaults are localhost, 9200 respectively.

    • Nitin Ahuja

      Hi Costin,

      I integrated Tableau with Elastic Search. It worked great when mapping in elastic search is simple, i.e. just key value pairs.

      I flattened out some key : {object} fields using dot ‘.’ Eg. hiveColumn: ESkey.fieldONe.

      However, I am having trouble mapping arrays in Elastic to columns in Hive. In Hive, my column type is Array<Struct>
      All the arrays in this column are pulled as JSON string in Tableau. Can you suggest some way we can flatten that JSON so that in Tableau they are just columns.

      We can’t use any JSON parser in Hive as column type is Array in hive not String.

      I would appreciate any help regarding that.

      • Aishwarya

        Dear Nitin,

        Apoloize for jumping randomly in to this conversation..

        I am trying to integrate Elastic search with tableau but found no good link that would help me out .. I found below link

        https://github.com/mradamlacey/elasticsearch-tableau-connector

        But procedure was no direct and some times I am directing to no where could you please share any link that you have followed or any documentation.

        Cheers,
        Aishwarya.M

  • Zehan Kebrar

    Can you point out why we should use hadoop? isnt elasticsearch is a map/reduce system already? I really couldnt understand why you used hadoop, and why I should use it. Please if you may explain i would be happy.

    • tdecaux

      Because Tableau has no ES connector …

  • sunil khuntia

    Hi,
    Thanks for the article.
    I have a question.
    can we join two external tables(Elasticsearch) using this HiveQL?

    Actually i tried to join two tables but it doesnt work.
    The query didnt fail as such, but in result i got “no data available”.
    So, basically the the join condition that I am providing isnt working. Though I personally checked that there are lots of common records in those two tables in the columns I am trying to join.

    I cant understand this behaviour.
    Appreciate any help on this.

    Thanks

  • Costin Leau

    An update on folks upgrading to Elasticsearch-Hadoop 1.3.M2 or higher, the StorageHandler has changed (from ESStorageHandler to EsStorageHandler).
    Additionally the query now sits in a separate field ‘es.query’ – if one wants all the results it can simply leave the field blank or not even specify it.

    That is the ‘wikitable’ definition (searches for wikipedia entries matching ‘buckethead’) becomes:

    CREATE EXTERNAL TABLE wikitable (
    title string,
    redirect_page string )
    STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
    TBLPROPERTIES(‘es.resource’ = ‘wikipedia_river/page’,
    ‘es.query’='?q=buckethead’,
    ‘es.host’ = ‘localhost’,
    ‘es.port’ = ’9200′)

    • Matthew

      +1 for mentioning Buckethead.

    • Prakash

      Please check the below error……..

      Hadoop – 2.6
      Hive – apache-hive-1.2.0-bin
      Elasticsearch – 1.4.4
      aux_lib – elasticsearch-hadoop-2.1.0.BUILD-20150606.023546-425.jar
      ——————————————-
      hive> select count(distinct title) from wikitable;

      Job Submission failed with exception ‘org.elasticsearch.hadoop.EsHadoopIllegalArgumentException(Index [wikipedia_river/page] missing and settings [es.field.read.empty.as.null] is set to false)’
      FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

      • karthik

        I have the same problem. Did anyone provide solution ?

  • Punit Shah

    cant we create a TDE file by firing queries on Elasticsearch and providing it to Tableau?

  • haninne

    Hello, pls I wanna do the reverse, I mean I wanna get data from HDFS and put it in elasticsearch. can I have some help pls

  • SK

    How to add TBLPROPERTIES to mark non_analyzed fields from Hive to Elasticsearch

  • neeraj

    please use EsStorageHandler instead of ESStorageHandler. then it will work properly.

  • Nox

    Tried this and its an order of magnitude slower than say the same table hosted natively in hive. Any other tricks to have the same lightning fast response on native elasticsearch?

  • Sun

    Hi,

    I need help, i am using elasticsearch-hadoop-1.3.0.M3.jar for uploading data from hive to Elasticsearch. But every time when i am uploading the data to Elasticsearch, its give me different count. But when i am putting limit in hive query it works fine.

    My External table schema is:

    CREATE EXTERNAL TABLE test_05012015(
    MEMBERSHIP_ID BIGINT
    ) ROW FORMAT DELIMITED
    STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
    TBLPROPERTIES(‘es.resource’ = ‘es_test_05012015/del’, ‘es.nodes’='xxxxx’, ‘es.port’=’9200′,’es.mapping.id’ = ‘membership_id’);

    Actual query

    NSERT OVERWRITE TABLE test_05012015
    SELECT MEMBERSHIP_ID
    FROM test_ALL_05012015;

    And the way i am put limit to upload data.

    INSERT OVERWRITE TABLE test_05012015
    SELECT MEMBERSHIP_ID
    FROM test_ALL_05012015 limit 11410858;

  • Diego Pereyra

    Thanks for sharing!
    Im collecting data into elasticsearch normaly, bu i want to query then with sql does. It is posible to do hive-sql -> els integration ??

  • Glorial

    dear costin,i create a external table base on es using the latest elastic hadoop jar packageļ¼Œbut i got the error following:
    java.lang.NoClassDefFoundError: org/elasticsearch/hadoop/hive/EsHiveInputFormat.
    How can i solve this?Thank you